Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

convert add_limit to pipe step based limiting #2131

Merged
merged 16 commits into from
Dec 16, 2024
Merged

Conversation

sh-rp
Copy link
Collaborator

@sh-rp sh-rp commented Dec 10, 2024

Description

Up until now we were managing limits inside a somewhat conflated function that was wrapping generators. The problem was that limits where applied before incrementals where and that we had a certain amount of code duplication with respect to wrapping async iterators. This PRs solves this.

@sh-rp sh-rp marked this pull request as ready for review December 10, 2024 15:47
Copy link

netlify bot commented Dec 10, 2024

Deploy Preview for dlt-hub-docs canceled.

Name Link
🔨 Latest commit fc013f5
🔍 Latest deploy log https://app.netlify.com/sites/dlt-hub-docs/deploys/676056ef7ec717000889d732

@sh-rp sh-rp linked an issue Dec 10, 2024 that may be closed by this pull request
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks so simple now and I think implementation is right. two things:

  • we have a test failing in a curious way. apparently we call rest API twice even if the limit is 1. why? we count items at the end of the pipe but there's just a single pipe. we must investigate
  • by counting at end of the pipe we change the behavior. I think it makes sense... but maybe we can add sticky flag as an argument to add_limit? so people can still stick it to gen object and count unfiltered items as before?

dlt/extract/items.py Outdated Show resolved Hide resolved
return self

def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]:
if self.count == self.max_items:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that is enough. we should close gen when we reach max items. but in this implementation we close gen at the end of the pipe. not all steps are sync steps, there are for example steps that yield. (or maybe even async steps, I do not remember). we should still return None when count > max_items.

I think we need to add more tests.

  • what happens if we do add_yield_map?
  • are we testing limit for async generators / iterators
  • any differences for round robin / fifo?
  • make sure that all expected items are pushed to transformer (this happens via special ForkStep)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is all tested now, except for round robin and fifo, but I am quite sure that this will not make a difference, since rr and fifo only apply at the get_source_item level and there is no async stuff going on in the add_limit (it's all taken care of already in other places)

if validator:
self.add_step(validator, insert_at=step_no if step_no >= 0 else None)
self.add_step(validator)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE: i remove inserting at the same position in favor of automatic resolution via placement affinity. I think this makes more sense, I can revert to the old behavior though.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO we should put it exactly at previous place. users may want to transform/filter data items before or after this step. and that must be preserved

class LimitItem(ItemTransform[TDataItem]):
placement_affinity: ClassVar[float] = 1.1 # stick to end right behind incremental

def __init__(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have moved the time limit and rate limiting over from the other PR. I think the time limit is fairly uncontroversial, the rate limiting is a little bit sketchy. I think it would be really cool to implement this with this PR, but we could also add a kind of global rate limiting on the Pipeiterator level that gets handed over from the DltSource to the Pipeiterator and is applied in the _get_source_item function to only extract a new item from any pipe if a min amount of time has passed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update: only keeping the time limit here which is very straightforward to implement and I think pretty useful.

@joscha
Copy link
Contributor

joscha commented Dec 12, 2024

related to #2142

@joscha
Copy link
Contributor

joscha commented Dec 13, 2024

closes #2142

Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still missing two things:

  1. actual test where limit is used with incremental to do the backfilling. my take is to add this to sql_database tests and (remember to add row_order)
  2. having this example I'd add it to performance guide and say how to split large backfils. esp. mentioning that records should be ordered and not to retake data twice (ie. via WHERE clause)
  3. on Py 3.11 some tests are not passing consistently. pls take a look

if validator:
self.add_step(validator, insert_at=step_no if step_no >= 0 else None)
self.add_step(validator)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO we should put it exactly at previous place. users may want to transform/filter data items before or after this step. and that must be preserved

dlt/extract/items.py Outdated Show resolved Hide resolved
@sh-rp sh-rp force-pushed the feat/make_limit_a_step branch from 3fc4d90 to f109a87 Compare December 15, 2024 21:04
@sh-rp
Copy link
Collaborator Author

sh-rp commented Dec 15, 2024

still missing two things:

1. actual test where limit is used with incremental to do the backfilling. my take is to add this to sql_database tests and (remember to add row_order)

2. having this example I'd add it to performance guide and say how to split large backfils. esp. mentioning that records should be ordered and not to retake data twice (ie. via WHERE clause)

3. on Py 3.11 some tests are not passing consistently. pls take a look

I have added a general test that combines incremental and add_limit. I will also add a nice example using the rfam database I think but will have to do this tomorrow.

@sh-rp sh-rp requested a review from rudolfix December 15, 2024 21:07
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we should always wrap iterators. See my comment

dlt/extract/utils.py Show resolved Hide resolved
tests/extract/test_incremental.py Show resolved Hide resolved
@sh-rp
Copy link
Collaborator Author

sh-rp commented Dec 16, 2024

@rudolfix I have delayed the iterator wrapping to the LimitItem binding now, I agree that this is probably a good idea. That said, the typing/importing is a bit messy now imho..


resource.add_limit(10)

p = dlt.pipeline(pipeline_name="incremtal_limit", destination="duckdb", dev_mode=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
p = dlt.pipeline(pipeline_name="incremtal_limit", destination="duckdb", dev_mode=True)
p = dlt.pipeline(pipeline_name="incremental_limit", destination="duckdb", dev_mode=True)

@sh-rp sh-rp linked an issue Dec 16, 2024 that may be closed by this pull request
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@sh-rp sh-rp merged commit 268768f into devel Dec 16, 2024
58 of 59 checks passed
@rudolfix rudolfix deleted the feat/make_limit_a_step branch December 19, 2024 14:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Make batched loading more convenient Filesystem Source incremental loading with S3 not working correctly
3 participants